package com.snail.common.mq.service.impl;

import com.alibaba.fastjson2.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.snail.common.core.constant.CommonConstants;
import com.snail.common.core.utils.StringUtils;
import com.snail.common.mq.constants.MqConstants;
import com.snail.common.mq.core.IMqProducerAndConsumerHandler;
import com.snail.common.mq.core.domain.SnailMqConsumer;
import com.snail.common.mq.core.dto.MqDto;
import com.snail.common.mq.mapper.SnailMqConsumerMapper;
import com.snail.common.mq.service.ISnailMqConsumerService;
import com.snail.common.mq.service.ISnailMqProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.List;

/**
 * @Description: MQ消息处理
 * @Author: Snail
 * @CreateDate: 2023/11/21 14:29
 * @Version: V1.0
 */
@Service
public class SnailMqConsumerServiceImpl extends ServiceImpl<SnailMqConsumerMapper, SnailMqConsumer> implements ISnailMqConsumerService {

    @Autowired
    private ISnailMqProducerService snailMqProducerService;
    @Autowired
    private IMqProducerAndConsumerHandler mqProducerAndConsumerHandler;

    @Override
    public <T> void insertConsumerInfo(MqDto<T> mqDto, Boolean result, String errorMsq) {
        SnailMqConsumer consumer = getSnailMqConsumer(mqDto);
        if (consumer == null) {
            consumer = new SnailMqConsumer();
            consumer.setPriKey(mqDto.getPriKey());
            consumer.setTopic(mqDto.getTopic());
            consumer.setExchange(mqDto.getExchange());
            consumer.setQueue(mqDto.getQueue());
            consumer.setMqType(mqDto.getMqType());
            consumer.setContent(JSON.toJSONString(mqDto.getContent()));
            consumer.setCreateUserId(mqDto.getUserId());
            consumer.setCreateUserName(mqDto.getUserName());
        }
        consumer.setRetryCount(consumer.getRetryCount() == null ? 0 : consumer.getRetryCount() + 1);
        consumer.setErrorMsg(errorMsq);
        consumer.setStatus(result ? "1" : "0");
        consumer.setCreateTime(LocalDateTime.now());
        this.saveOrUpdate(consumer);
    }

    @Override
    public List<SnailMqConsumer> selectMqConsumerList(SnailMqConsumer consumer) {
        LambdaQueryWrapper<SnailMqConsumer> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(StringUtils.isNotEmpty(consumer.getTopic()), SnailMqConsumer::getTopic, consumer.getTopic());
        queryWrapper.eq(StringUtils.isNotEmpty(consumer.getMqType()), SnailMqConsumer::getMqType, consumer.getMqType());
        queryWrapper.eq(StringUtils.isNotEmpty(consumer.getExchange()), SnailMqConsumer::getExchange, consumer.getExchange());
        queryWrapper.eq(StringUtils.isNotEmpty(consumer.getQueue()), SnailMqConsumer::getQueue, consumer.getQueue());
        queryWrapper.eq(StringUtils.isNotEmpty(consumer.getPriKey()), SnailMqConsumer::getPriKey, consumer.getPriKey());
        queryWrapper.eq(StringUtils.isNotEmpty(consumer.getCreateUserId()), SnailMqConsumer::getCreateUserId, consumer.getCreateUserId());
        queryWrapper.like(StringUtils.isNotEmpty(consumer.getCreateUserName()), SnailMqConsumer::getCreateUserName, consumer.getCreateUserName());
        queryWrapper.ge(consumer.getCreateTime() != null, SnailMqConsumer::getCreateTime, consumer.getCreateTime());
        queryWrapper.le(consumer.getCreateTime() != null, SnailMqConsumer::getCreateTime, consumer.getCreateTime());
        return baseMapper.selectList(queryWrapper);
    }

    private <T> SnailMqConsumer getSnailMqConsumer(MqDto<T> mqDto) {
        LambdaQueryWrapper<SnailMqConsumer> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(SnailMqConsumer::getPriKey, mqDto.getPriKey());
        queryWrapper.eq(SnailMqConsumer::getTopic, mqDto.getTopic());
        queryWrapper.eq(StringUtils.isNotEmpty(mqDto.getExchange()), SnailMqConsumer::getExchange, mqDto.getExchange());
        queryWrapper.eq(StringUtils.isNotEmpty(mqDto.getQueue()), SnailMqConsumer::getQueue, mqDto.getQueue());
        return baseMapper.selectOne(queryWrapper);
    }


    /**
     * 每月1号触发
     * 定时任务删除三个月之前的消费成功记录
     */
    @Scheduled(cron = "0 0 0 1 * ? *")
    private void deleteProducerInfo() {
        LocalDate localDate = LocalDate.now().plusMonths(-3);
        LambdaUpdateWrapper<SnailMqConsumer> updateWrapper = new LambdaUpdateWrapper<>();
        updateWrapper.lt(SnailMqConsumer::getCreateTime, localDate);
        updateWrapper.eq(SnailMqConsumer::getStatus, CommonConstants.STATUS_SUCCESS);
        this.baseMapper.delete(updateWrapper);
    }

    /**
     * 将失败的消息进行重试消费
     */
    @Scheduled(cron = "0 1 * * * ? *")
    private void retryConsumer(){
        LambdaQueryWrapper<SnailMqConsumer> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(SnailMqConsumer::getStatus,CommonConstants.STATUS_FAIL);
        queryWrapper.le(SnailMqConsumer::getRetryCount,10);
        List<SnailMqConsumer> consumerList = this.baseMapper.selectList(queryWrapper);
        for (SnailMqConsumer consumer : consumerList) {
            MqDto<Object> mqDto = new MqDto<>();
            mqDto.setPriKey(consumer.getPriKey());
            mqDto.setTopic(consumer.getTopic());
            mqDto.setExchange(consumer.getExchange());
            mqDto.setQueue(consumer.getQueue());
            mqDto.setContent(consumer.getContent());
            mqDto.setUserId(consumer.getConsumerId());
            mqDto.setUserName(consumer.getCreateUserName());
            mqProducerAndConsumerHandler.sendMq(mqDto);
        }
    }
}
